#!/bin/bash
# File Name: start.sh
# Author: moshan
# mail: mo_shan@yeah.net
# Created Time: 2021-12-21 11:40:20
# Function: 对比两个MySQL的数据行是否一致
#########################################################################
set -e -o pipefail
export LANG=en_US.UTF-8

work_dir="/data/git/check_data_for_mysql"

log_addr='${BASH_SOURCE}:${FUNCNAME}:${LINENO}' #eval echo ${log_addr}

function f_usage()
{
    echo
    echo -e "$(printf %-16s "Usage: \033[35m$0")\033[0m\n"
    echo -e "$(printf %-16s ) \033[32m[ -t check_table ]             \033[33m需要检查的表列表, 默认整库"
    echo -e "$(printf %-16s ) \033[32m                               \033[34mdb.table : 表示是仅检查这个库下的这个表, 其他库的这个表就不检查"
    echo -e "$(printf %-16s ) \033[32m                               \033[34mtable : 表示所有库下的这个表都检查"
    echo
    echo -e "$(printf %-16s ) \033[32m[ -T skip_check_table ]        \033[33m不需要检查的表, 默认不过滤"
    echo -e "$(printf %-16s ) \033[32m                               \033[34mdb.table : 表示是仅这个库下的这个表不校验, 其他库的这个表会检查"
    echo -e "$(printf %-16s ) \033[32m                               \033[34mtable : 表示所有库下的这个表都不检查"
    echo
    echo -e "$(printf %-16s ) \033[32m[ -d check_db ]                \033[33m需要检查的库, 默认是除了系统库以外的所有库"
    echo -e "$(printf %-16s ) \033[32m[ -D skip_check_db ]           \033[33m不需要检查的库, 默认不过滤"
    echo -e "$(printf %-16s ) \033[32m[ -w threads ]                 \033[33m最大并发数, 总共开几个线程进行并行校验"
    echo
    echo -e "$(printf %-16s ) \033[32m[ -l limit_time ]              \033[33m哪些时间段允许跑校验, 默认是所有时间, 如需限制可以使用该参数进行限制, 多个时间用英文逗号隔开(1,5,10), 时间段可以用中划线连起来(1-5)"
    echo -e "$(printf %-16s ) \033[32m                               \033[34m1-5,10-15   表示1点到5点(包含1点和5点), 或者10点到15点可以跑, 需要注意都是闭区间的"
    echo -e "$(printf %-16s ) \033[32m                               \033[34m1,5,10,15   表示1点, 5点, 10点, 15点可以跑"
    echo
    echo -e "$(printf %-16s ) \033[32m[ -f true ]                    \033[33m是否执行check操作, 默认是false, 只有为true的时候才会check, -f stop 表示终止检查任务"
    echo -e "$(printf %-16s ) \033[32m[ -c checksize ]               \033[33m每次从数据库拿多少行数据进行比较, 默认10000, 不建议修改这个值到太大或者太小, 建议值是5000-50000, 较小的话校验时间较长, 较大的话容易影响数据库"
    echo -e "$(printf %-16s ) \033[32m[ -h ]                         \033[33m帮助信息"
    echo
    exit 1
}

while getopts :t:T:d:D:w:c:f:l:h OPTION
do
    case "$OPTION" in
        t)
            check_table_list="${OPTARG}"
            ;;
        T)
            skip_check_table_list="${OPTARG}"
            ;;
        d)
            check_db_list="${OPTARG}"
            ;;
        D)
            skip_check_db_list="${OPTARG}"
            ;;
        w)
            max_threads="${OPTARG}"
            ;;
        c)
            row_count="${OPTARG}"
            ;;
        f)
            check="${OPTARG}"
            ;;
        l)
            check_time="${OPTARG}"
            ;;
        h)f_usage;;
        :)echo -e "\n'-${OPTARG}' : 这个选项需要一个值";f_usage;;
        ?|*)echo -e "\n'-${OPTARG}' : 不识别这个选项.";f_usage;;
    esac
done

function f_prepare()
{
    #以下无需编辑修改
    func_dir="${work_dir}/func"
    conf_dir="${work_dir}/conf"
    log_dir="${work_dir}/log"
    list_dir="${log_dir}/list"
    res_dir="${log_dir}/res"
    md5_dir="${log_dir}/md5"
    res_row_dir="${res_dir}/row"
    res_diff_dir="${res_dir}/diff"
    pri_file_dir="${log_dir}/pri"
    res_table_dir="${res_dir}/table"

    stop_file="${log_dir}/stop_file"
    check_mark_file="${log_dir}/.check_mark_file"
    pause_file="${log_dir}/pause_file"
    conf_file="${conf_dir}/check.conf"
    log_file="${log_dir}/info.log"
    sql_md5_str="$(date +%N|md5sum|awk '{print $1}')"
    skip_file="${log_dir}/skip.log"

    if [ -f "${func_dir}/f_logging.sh" ]
    then
        . ${func_dir}/f_logging.sh
    else
        echo -e "\033[31m[$(date "+%F %H:%M:%S")] [ ERROR ] [ ${FUNCNAME}:${LINENO} ] 缺少必要的配置文件[${func_dir}/f_logging.sh\033[0m"
        exit 1
    fi

    if [ -d "${log_dir}" ] && [ "$(ls ${log_dir}|wc -l)x" != "0x" ]
    then
        f_logging "$(eval echo ${log_addr}):ERROR" "日志目录 '${log_dir}' 已经存在且不是空目录, 请清理或备份后重新运行" "2" "1"
    fi

    #创建依赖目录
    mkdir -p ${log_dir} ${md5_dir} ${list_dir} ${res_row_dir} ${pri_file_dir} ${res_table_dir} ${res_diff_dir}

    if [ -f "${conf_file}" ]
    then
        . ${conf_file}
    else
        f_logging "$(eval echo ${log_addr}):ERROR" "缺少必要的配置文件[${conf_file}" "2" "1"
    fi
    mysql_port="${mysql_port1}"
    host_list=(${mysql_host1} ${mysql_host2})
    mysql_comm="${mysql_path} -u${mysql_user} -p${mysql_passwd} -h${host_list[0]} -P${mysql_port1}"
    mysql_ver="$(${mysql_comm} -NBe "select version();" 2>/dev/null)" || true
    if [ -z "${mysql_ver}" ]
    then
        f_logging "$(eval echo ${log_addr}):ERROR" "Can't connect to MySQL server on '${mysql_user}@${mysql_host1}' for mysql_port:${mysql_port}" "2" "1"
    else
        cpu_count="$(cat /proc/cpuinfo |grep -c processor)" || true
        cpu_free="$(echo "$(top -bn 1|grep "%Cpu"|awk '{print $8/100}') ${cpu_count}"|awk '{print $1*$2}'|awk -v t="${threads}" -F. '{if (t>=$1-1) {
            print $1-1}else{print t}}')" || true
        if [ ${cpu_free} -lt ${threads} ]
        then
            f_logging "$(eval echo ${log_addr}):WARN" "并发数是${threads}, 但空闲CPU只有$((${cpu_free}+1))个, 现在将并发数强制变更为${cpu_free}."
            threads="${cpu_free}"
            if [ "${threads}x" == "0x" ]
            then
                f_logging "$(eval echo ${log_addr}):ERROR" "空闲CPU是${threads}, 现在退出运行." "2" "1"
            fi
        fi
        f_logging "$(eval echo ${log_addr}):INFO" "本次数据一致性检查开始"
    fi

    is_tidb="$(grep -c -- "-TiDB-" <<< "${mysql_ver}")" || true

    # 允许的进程数
    THREAD_NUM=${threads}
    pipe_file="${log_dir}/pipe-table.mark"
    [ -p "${pipe_file}" ] && rm -f "${pipe_file}"
    mkfifo ${pipe_file}
    # 定义描述符为3的管道
    exec 3<> ${pipe_file}

    # 预先写入指定数量的进程个数
    seq ${THREAD_NUM} 1>&3
}

function f_check_partition()
{
    free_space="$(df|grep ${log_partition} 2> /dev/null|awk -v size="${log_par_size}" '{if ($4/1024/1024>size){print 1}else print 0}')" || true
    if [ "${free_space}x" == "0x" ]
    then
        f_logging "WARN:${FUNCNAME}:${LINENO}" "该日志目录所在分区[${log_partition}]所剩空间少于${log_par_size}GB, 程序将退出" "2" "1"
    fi
}

function f_main()
{
    declare db_list
    f_prepare
    f_check_partition
    if [ -f "${func_dir}/f_check_diff_for_mysql.sh" ]
    then
        . ${func_dir}/f_switch_time.sh
        . ${func_dir}/f_check_diff_for_row.sh
        . ${func_dir}/f_check_diff_for_mysql.sh
    else
        f_logging "$(eval echo ${log_addr}):ERROR" "缺少必要的配置文件[${func_dir}/f_check_diff.sh" "2" "1"
    fi

    #以下这几个逻辑就是以命令行参数为准, 覆盖配置文件的配置
    [ -n "${row_count}" ] && max_count="${row_count}"
    [ -n "${max_threads}" ] && [ ${max_threads} -le ${threads} ] && threads="${max_threads}"
    [ -n "${check_table_list}" ] && check_table="${check_table_list}"
    [ -n "${skip_check_table_list}" ] && skip_check_table="${skip_check_table_list}"
    [ -n "${check_db_list}" ] && check_db="${check_db_list}"
    [ -n "${skip_check_db_list}" ] && skip_check_db="${skip_check_db_list}"

    if [ -z "${check_db}" ]
    then
        db_list=($(${mysql_comm} -NBe "show databases" 2>/dev/null|grep -vPi "$(echo ${skip_check_db}|sed 's/^/\^/g;s/$/\$/g;s/,/\$\|\^/g')")) || true
    else
        db_list=($(echo ${check_db}|tr "," " "))
    fi

    f_logging "$(eval echo ${log_addr}):WARN" "本次数据一致性检查将检查如下库 : [${db_list[*]}]"

    if [ "${check}x" == "stopx" ]
    then
        touch ${stop_file}
        f_logging "$(eval echo ${log_addr}):WARN" "终止所有数据一致性检查任务" "2"
        exit
    elif [ "${check}x" != "truex" ]
    then
        [ -p "${pipe_file}" ] && rm -f "${pipe_file}"
        f_logging "$(eval echo ${log_addr}):WARN" "如需继续检查请使用 -f true选项进行数据一致性检查, 其他帮助信息请使用 -h 选项进行查看" "2" "1"
    fi

    for db in ${db_list[@]}
    do
        [ ! -f "${check_mark_file}" ] && touch ${check_mark_file}
        log_file="${log_dir}/info.log"
        [ -f "${stop_file}" ] && break
        f_logging "$(eval echo ${log_addr}):INFO" "正在检查${db}库"
        table_list_file="${list_dir}/${db}.list"         #已经检查完成的表
        table_list_file_ing="${list_dir}/${db}_ing.list" #正在检查的表
        [ ! -f "${table_list_file}" ] && touch ${table_list_file}

        > ${table_list_file_ing}

        f_check_diff_for_mysql "${db}" || true
        echo -en "\033[0m"
        [ -f "${stop_file}" ] && break

        for table_name in $(ls -l ${error_log_table_dir} 2>/dev/null|awk '/.log/{print $NF}'|awk -F'.log' '{print $1}'|sort|uniq)
        do
            log_file="${log_dir}/diff.log"
            __="${db}.${table_name} ] [ 进行更细化分析不一致部分"
            f_logging "$(eval echo ${log_addr}):WARN" "${__}"
            f_check_diff_for_row "${db}" "${table_name}" || true
            echo -en "\033[0m"
            __="${db}.${table_name} ] [ 细化分析结束"
            f_logging "$(eval echo ${log_addr}):WARN" "${__}"
        done
    done

    [ -p "${pipe_file}" ] && rm -f "${pipe_file}"

    touch ${stop_file}_check_net

    if [ -f "${skip_file}" ] || [ -f "${stop_file}" ] || [ "$(ls ${res_diff_dir}|wc -l)x" != "0x" ]
    then
        f_logging "$(eval echo ${log_addr}):ERROR" "本次数据一致性检查完成 ] [ 不通过" "2"
        if [ -f "${skip_file}" ]
        then
            while read line
            do
                f_logging "$(eval echo ${log_addr}):ERROR" "${line}"
            done < ${skip_file}
        fi
        exit 1
    else
        f_logging "$(eval echo ${log_addr}):INFO" "本次数据一致性检查完成 ] [ 通过" "2"
    fi
}

f_main
